package org.hawkular.apm.server.kafka;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.hawkular.apm.api.services.Publisher;
import org.hawkular.apm.api.utils.PropertyUtil;
import org.hawkular.apm.server.api.task.ProcessingUnit;
import org.hawkular.apm.server.api.task.Processor;

/* loaded from: input_file:org/hawkular/apm/server/kafka/AbstractConsumerKafka.class */
public class AbstractConsumerKafka<S, T> implements KafkaProcessor {
    private static final int DEFAULT_POLLING_INTERVAL = 100;
    private KafkaConsumer<String, String> consumer;
    private TypeReference<S> typeReference;
    private Publisher<S> retryPublisher;
    private Processor<S, T> processor;
    private Publisher<T> publisher;
    private long pollingInterval = 100;
    private static final Logger log = Logger.getLogger(AbstractConsumerKafka.class.getName());
    private static ObjectMapper mapper = new ObjectMapper();

    public AbstractConsumerKafka(String str, String str2) {
        init(str, str2);
    }

    protected void init(String str, String str2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", PropertyUtil.getProperty(PropertyUtil.HAWKULAR_APM_URI, "kafka:localhost:9092").substring(PropertyUtil.KAFKA_PREFIX.length()));
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, str2);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, PropertyUtil.getProperty(PropertyUtil.HAWKULAR_APM_KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL, "1000"));
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, PropertyUtil.getProperty(PropertyUtil.HAWKULAR_APM_KAFKA_CONSUMER_SESSION_TIMEOUT, "30000"));
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        String property = PropertyUtil.getProperty(PropertyUtil.HAWKULAR_APM_KAFKA_MAX_POLL_RECORDS);
        if (property != null) {
            properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, property);
        }
        this.consumer = new KafkaConsumer<>(properties);
        this.consumer.subscribe(Arrays.asList(str));
        this.pollingInterval = PropertyUtil.getPropertyAsInteger(PropertyUtil.HAWKULAR_APM_KAFKA_POLLING_INTERVAL, 100).intValue();
    }

    public TypeReference<S> getTypeReference() {
        return this.typeReference;
    }

    public void setTypeReference(TypeReference<S> typeReference) {
        this.typeReference = typeReference;
    }

    public Processor<S, T> getProcessor() {
        return this.processor;
    }

    public void setProcessor(Processor<S, T> processor) {
        this.processor = processor;
    }

    public Publisher<T> getPublisher() {
        return this.publisher;
    }

    public void setPublisher(Publisher<T> publisher) {
        this.publisher = publisher;
    }

    public Publisher<S> getRetryPublisher() {
        return this.retryPublisher;
    }

    public void setRetryPublisher(Publisher<S> publisher) {
        this.retryPublisher = publisher;
    }

    protected long getPollingInterval() {
        return this.pollingInterval;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (getProcessor() == null) {
            return;
        }
        while (true) {
            ConsumerRecords<String, String> poll = this.consumer.poll(getPollingInterval());
            if (!poll.isEmpty()) {
                ArrayList arrayList = new ArrayList();
                Iterator<ConsumerRecord<String, String>> it = poll.iterator();
                while (it.hasNext()) {
                    try {
                        arrayList.add(mapper.readValue(it.next().value(), this.typeReference));
                    } catch (IOException e) {
                        log.log(Level.SEVERE, "Failed to deserialise json", (Throwable) e);
                    }
                }
                try {
                    if (log.isLoggable(Level.FINEST)) {
                        log.finest(getClass().getSimpleName() + ": received " + arrayList.size() + " records after polling " + getPollingInterval() + "ms");
                    }
                    process(null, arrayList, 1);
                } catch (Throwable th) {
                    log.log(Level.SEVERE, "Failed to process records", th);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void process(String str, List<S> list, int i) throws Exception {
        ProcessingUnit processingUnit = new ProcessingUnit();
        processingUnit.setProcessor(getProcessor());
        processingUnit.setRetryCount(i);
        processingUnit.setResultHandler((str2, list2) -> {
            getPublisher().publish(str2, list2, getPublisher().getInitialRetryCount(), getProcessor().getDeliveryDelay(list2));
        });
        processingUnit.setRetryHandler((str3, list3) -> {
            getRetryPublisher().retry(str3, list3, processingUnit.getRetrySubscriber(), processingUnit.getRetryCount() - 1, getProcessor().getRetryDelay(list3, processingUnit.getRetryCount() - 1));
        });
        processingUnit.handle(str, list);
    }
}
